Весь код и используемые файлы можно найти тут(кликабельно)
В транспьютерной матрице размером 5*5, в каждом узле которой находится один процесс, необходимо выполнить операцию рассылки данных (длиной 4 байта) всем процессам от одного (MPI_SCATTER) - процесса с координатами (0,0).
Реализовать программу, моделирующую выполнение операции MPI_SCATTER на транспьютерной матрице при помощи пересылок MPI типа точка-точка.
Получить временную оценку работы алгоритма. Оценить сколько времени потребуется для выполнения операции MPI_SCATTER, если все процессы выдали ее одновременно. Время старта равно 100, время передачи байта равно 1 (Ts=100,Tb=1). Процессорные операции, включая чтение из памяти и запись в память, считаются бесконечно быстрыми.
Доработать MPI-программу, реализованную в рамках курса “Суперкомпьютеры и параллельная обработка данных”.
Добавить контрольные точки для продолжения работы программы в случае сбоя. Реализовать один из 3-х сценариев работы после сбоя:
По условию необходимо осуществить передачу данных(4 байта(инт)) от узла (0, 0) до всех остальных узлов. Так же в условии сказано, что матрица размера (5,5), но решение можно обобщить до произвольной квадратной матрицы(размерности 2 и более).
Поскольку передачи в транспьютерной матрице возможны только к соседним узлам, то можно вычислять расстояние между какими-либо ячейками в количестве этих передач. А именно оно равно сумме модулей разности между координатами узлов, и это будет минимальное количество передач, потому что при меньшем количестве передач можно прийти к логичесткому противоречию с условием передачи данных только соседним узлам.
На данной диаграмме можно увидеть схему работы программы.
Она сначала распространяет число между осями с началом в со всеми узлами с первой нулевой координатой. Далее все эти процессы начинают передавать число вниз по цепочке до последнего. И получается, что самая долгая передача до узла с индексами (N-1, N-1). Причёт отличительная особенно алгоритма в том, что фактически никаких синхронизации в нём не нужны, каждый процесс знает от кого он принимает данные и кому он должен их передать, поэтому каждый процесс может завершаться сразу же после выполнения своих функций, не дожидаясь завершения всех остальных процессов.
Время одной передачи вычисляется как время старта + время на передачу байтаколичество байт. В нашем случае это формула имеет 100 + 1 4 = 104 для одной передачи. Несложно вычислить, что количество передач до узла (N-1, N-1) равно N-1 + N-1 = 2*N -2. Если подставить N из условия, то есть 5, то получается ответ 832. А ниже представлен обобщённый график.
import plotly.express as px
def calculate_time(size, ts = 100, tb = 1, b_count = 4):
return (size-1) * 2 * (ts + tb * b_count)
data = {
"x":[*range(2, 20)]
}
data["y"] = list(map(calculate_time, data["x"]))
fig = px.line(data, labels= {"x":"Size of the transputer matrix", "y":"MPI_SCATTER elapsed time"}, markers=True,
x="x", y="y",title="The dependence of the execution time on the size of the transputer matrix")
fig.show()
Было принято решение о расставлении контрольных точек и в случае каких-либо сбоев осуществляется возврат к этим контрольным точкам, удаление повреждённых процессов, создание новых и последующая синхронизация. Более подробно алгоритм можно увидеть на данной блоксхеме.
Такой подход позволяет программе спокойно выполниться, независимо от количества сбоев, потому что она может порождать неограниченное каким-то фиксированным числом количество процессов.
Основная часть кода - обработка ошибочного состояния процесса, его выделение из общего коммуникатора, удаление, создание нового процесса и его интеграция в существующий коммуникатор таким образом, чтобы сохранить номер утерянного процесса для того, чтобы не нагружать полезный код заботой о правильном распределении работы между старыми и новыми процессами.
При возникновении какой-либо ошибки MPI директивы об это уведомляются все процессы группы(основного коммуникатора) и они сразу откатываются к контрольной точке.
Механизм возникновения ошибок реализован через генератор случайных чисел. Каждый процесс на каждой итерации алгоритма с рансом 1/100 может выйти из строя и завершиться аварийно.(кроме 0 процесса, потому что он является ведущим). Если происходит данная ситуациия, то весь остальной код узнает об этом только тогда, когда прочитает сообщение статуса MPI директивы, а после этого уведомит все процессы группы. После этого последует полный откат всех вычислений и восстановление предыдущей итерации основного цикла.
Можно запускать прилагаемый код c помощью bash script файла start.sh. С помощью команды . start.sh <количество процессов>
Содержимое файла start.sh:
mpicc main.c -o test -lm
mpiexec -n $1 ./test
Для начала нужно запустить docker container. Это можно сделать c помощью прилагаемого bash script файла start_docker.sh. С помощью команды . start_docker.sh
Содержимое файла start_docker.sh:
ulfm_image=abouteiller/mpi-ft-ulfm
docker pull $ulfm_image
Можно запускать прилагаемый файл start.sh c помощью bash script файла start. С помощью команды . start.sh <количество процессов>
Содержимое файла start.sh:
function mpirun {
docker run --user $(id -u):$(id -g) --cap-drop=all --security-opt label:disabled -v $PWD:/sandbox $ulfm_image mpirun --map-by :oversubscribe --mca btl tcp,self $@
}
function mpicc {
docker run --user $(id -u):$(id -g) --cap-drop=all --security-opt label:disabled -v $PWD:/sandbox $ulfm_image mpicc $@
}
mpicc mpi_sor_3d_upgraded.c -o test
mpirun --with-ft ulfm -n $1 ./test
Был реализован алгоритм MPI_Scatter с использованием пересылок типа точка-точка и средств MPI. Также была выведена теоретическая временная оценка предложенного алгоритма.
Была реализована надёжная версия алгоримта Sor_3D, которая устойчива к смерти условно бесконечного числа процессов. Этого удалось достичь, благодаря реализации метода создания асинхронных контрольных точек и восстановления из контрольных точек утерянных процессов.
main.c:
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <mpi.h>
MPI_Request send_int(int x, int y, MPI_Comm comm, int *data) {
int target_rank;
int coords[2] = {x, y};
MPI_Cart_rank(comm, coords, &target_rank);
MPI_Request request;
MPI_Isend(data, 1, MPI_INT, target_rank,
0, MPI_COMM_WORLD, &request);
return request;
}
int receive_int(int x, int y, MPI_Comm comm) {
int target_rank;
int coords[2] = {x, y};
MPI_Cart_rank(comm, coords, &target_rank);
int data;
MPI_Recv(&data, 1, MPI_INT, target_rank,
MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
return data;
}
int main(int argc, char **argv) {
int debug = 1;
int size = 3;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
size = (int) sqrt((double) size);
int rank, r_size;
int ndims = 2;
int n_size[2] = {size, size};
int periodic[2] = {0, 0};
MPI_Comm cart_comm;
MPI_Cart_create(MPI_COMM_WORLD, ndims, n_size, periodic, 1, &cart_comm);
MPI_Comm_rank(cart_comm, &rank);
MPI_Comm_size(cart_comm, &r_size);
int mpi_map[size][size];
for (int i = 0; i < size * size; i++) {
int coords[2];
MPI_Cart_coords(cart_comm, i, ndims, coords);
mpi_map[coords[0]][coords[1]] = i;
}
srand(r_size + size);
int coords[2];
MPI_Cart_coords(cart_comm, rank, ndims, coords);
// printf("My id = %2d, my coords is <%2d, %2d>\n", rank, coords[0], coords[1]);
int number;
if (coords[0] == 0) {
if (coords[1] == 0) {
number = rand();
} else {
number = receive_int(coords[0], coords[1] - 1, cart_comm);
printf("Got number, my coords is <%2d, %2d>\n", coords[0], coords[1]);
}
MPI_Request r1, r2;
if (coords[1] + 1 != size) {
r1 = send_int(coords[0], coords[1] + 1, cart_comm, &number);
}
r2 = send_int(coords[0] + 1, coords[1], cart_comm, &number);
if (coords[1] + 1 != size) {
MPI_Wait(&r1, MPI_STATUS_IGNORE);
}
MPI_Wait(&r2, MPI_STATUS_IGNORE);
} else {
number = receive_int(coords[0] - 1, coords[1], cart_comm);
printf("Got number, my coords is <%2d, %2d>\n", coords[0], coords[1]);
MPI_Request r1;
if (coords[0] + 1 != size) {
r1 = send_int(coords[0] + 1, coords[1], cart_comm, &number);
MPI_Wait(&r1, MPI_STATUS_IGNORE);
}
}
MPI_Finalize();
return 0;
}
mpi_sor_3d_upgraded.c:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <math.h>
#include <mpi.h>
#include <mpi-ext.h>
#include <time.h>
int rank = MPI_PROC_NULL, verbose = 1; /* makes this global (for printfs) */
char **gargv;
int MPIX_Comm_replace(MPI_Comm comm, MPI_Comm *newcomm) {
MPI_Comm icomm, /* the intercomm between the spawnees and the old (shrinked) world */
scomm, /* the local comm for each sides of icomm */
mcomm; /* the intracomm, merged from icomm */
MPI_Group cgrp, sgrp, dgrp;
int rc, flag, rflag, i, nc, ns, nd, crank, srank, drank;
redo:
if (comm == MPI_COMM_NULL) { /* am I a new process? */
/* I am a new spawnee, waiting for my new rank assignment
* it will be sent by rank 0 in the old world */
MPI_Comm_get_parent(&icomm);
scomm = MPI_COMM_WORLD;
MPI_Recv(&crank, 1, MPI_INT, 0, 1, icomm, MPI_STATUS_IGNORE);
if (verbose) {
MPI_Comm_rank(scomm, &srank);
printf("Spawnee %d: crank=%d\n", srank, crank);
}
} else {
/* I am a survivor: Spawn the appropriate number
* of replacement processes (we check that this operation worked
* before we process further) */
/* First: remove dead processes */
MPIX_Comm_shrink(comm, &scomm);
MPI_Comm_size(scomm, &ns);
MPI_Comm_size(comm, &nc);
nd = nc - ns; /* number of deads */
if (0 == nd) {
/* Nobody was dead to start with. We are done here */
MPI_Comm_free(&scomm);
*newcomm = comm;
return MPI_SUCCESS;
}
/* We handle failures during this function ourselves... */
MPI_Comm_set_errhandler(scomm, MPI_ERRORS_RETURN);
rc = MPI_Comm_spawn(gargv[0], &gargv[1], nd, MPI_INFO_NULL,
0, scomm, &icomm, MPI_ERRCODES_IGNORE);
flag = (MPI_SUCCESS == rc);
MPIX_Comm_agree(scomm, &flag);
if (!flag) {
if (MPI_SUCCESS == rc) {
MPIX_Comm_revoke(icomm);
MPI_Comm_free(&icomm);
}
MPI_Comm_free(&scomm);
if (verbose) fprintf(stderr, "%04d: comm_spawn failed, redo\n", rank);
goto redo;
}
/* remembering the former rank: we will reassign the same
* ranks in the new world. */
MPI_Comm_rank(comm, &crank);
MPI_Comm_rank(scomm, &srank);
/* the rank 0 in the scomm comm is going to determine the
* ranks at which the spares need to be inserted. */
if (0 == srank) {
/* getting the group of dead processes:
* those in comm, but not in scomm are the deads */
MPI_Comm_group(comm, &cgrp);
MPI_Comm_group(scomm, &sgrp);
MPI_Group_difference(cgrp, sgrp, &dgrp);
/* Computing the rank assignment for the newly inserted spares */
for (i = 0; i < nd; i++) {
MPI_Group_translate_ranks(dgrp, 1, &i, cgrp, &drank);
/* sending their new assignment to all new procs */
MPI_Send(&drank, 1, MPI_INT, i, 1, icomm);
}
MPI_Group_free(&cgrp);
MPI_Group_free(&sgrp);
MPI_Group_free(&dgrp);
}
}
/* Merge the intercomm, to reconstruct an intracomm (we check
* that this operation worked before we proceed further) */
rc = MPI_Intercomm_merge(icomm, 1, &mcomm);
rflag = flag = (MPI_SUCCESS == rc);
MPIX_Comm_agree(scomm, &flag);
if (MPI_COMM_WORLD != scomm) MPI_Comm_free(&scomm);
MPIX_Comm_agree(icomm, &rflag);
MPI_Comm_free(&icomm);
if (!(flag && rflag)) {
if (MPI_SUCCESS == rc) {
MPI_Comm_free(&mcomm);
}
if (verbose) fprintf(stderr, "%04d: Intercomm_merge failed, redo\n", rank);
goto redo;
}
/* Now, reorder mcomm according to original rank ordering in comm
* Split does the magic: removing spare processes and reordering ranks
* so that all surviving processes remain at their former place */
rc = MPI_Comm_split(mcomm, 1, crank, newcomm);
/* Split or some of the communications above may have failed if
* new failures have disrupted the process: we need to
* make sure we succeeded at all ranks, or retry until it works. */
flag = (MPI_SUCCESS == rc);
MPIX_Comm_agree(mcomm, &flag);
MPI_Comm_free(&mcomm);
if (!flag) {
if (MPI_SUCCESS == rc) {
MPI_Comm_free(newcomm);
}
if (verbose) fprintf(stderr, "%04d: comm_split failed, redo\n", rank);
goto redo;
}
/* restore the error handler */
if (MPI_COMM_NULL != comm) {
MPI_Errhandler errh;
MPI_Comm_get_errhandler(comm, &errh);
MPI_Comm_set_errhandler(*newcomm, errh);
}
return MPI_SUCCESS;
}
#define Max(a, b) ((a)>(b)?(a):(b))
#define N ((2<<5)+2)
int slider = 0;
double maxeps = 0.1e-7;
double A[2][N][N][N];
double receive_buf[N][N][N];
void init();
void verify();
int main(int argc, char **argv) {
MPI_Comm world; /* a world comm for the work, w/o the spares */
MPI_Comm rworld; /* and a temporary handle to store the repaired copy */
int size, spare;
int ret_code; /* error code from MPI functions */
char estr[MPI_MAX_ERROR_STRING] = "";
int strl; /* error messages */
double start, tff = 0, twf = 0; /* timings */
int was_error;
gargv = argv;
MPI_Init(&argc, &argv);
/* Am I a spare ? */
MPI_Comm_get_parent(&world);
if (MPI_COMM_NULL == world) {
/* First run: Let's create an initial world,
* a copy of MPI_COMM_WORLD */
MPI_Comm_dup(MPI_COMM_WORLD, &world);
MPI_Comm_size(world, &size);
MPI_Comm_rank(world, &rank);
/* We set an errhandler on world, so that a failure is not fatal anymore. */
MPI_Comm_set_errhandler(world, MPI_ERRORS_RETURN);
} else {
/* I am a spare, lets get the repaired world */
MPIX_Comm_replace(MPI_COMM_NULL, &world);
MPI_Comm_size(world, &size);
MPI_Comm_rank(world, &rank);
/* We set an errhandler on world, so that a failure is not fatal anymore. */
MPI_Comm_set_errhandler(world, MPI_ERRORS_RETURN);
//without goto
}
double time_start = MPI_Wtime();
int line_start[size];
int line_count[size];
int n2 = N * N;
for (int i = 0; i < size; i++) {
line_count[i] = n2 / size;
}
for (int i = 0; i < n2 - (n2 / size) * size; i++) {
line_count[i]++;
}
line_start[0] = 0;
for (int i = 1; i < size; i++) {
line_start[i] = line_start[i - 1] + line_count[i - 1];
}
if (rank == -1) { // just debug information
for (int i = 0; i < size; i++) {
printf("%5d", line_start[i]);
}
printf("\n");
for (int i = 0; i < size; i++) {
printf("%5d", line_count[i]);
}
printf("\n");
}
int receive_counts[size];
int start_offset[size];
for (int i = 0; i < size; i++) {
receive_counts[i] = line_count[i] * N;
start_offset[i] = line_start[i] * N;
}
if (rank == 0) { //init only in main process
init();
}
int itmax = 100;
srand(time(NULL));
for (int iteration = 1; iteration < itmax; ++iteration) {
MPI_Bcast(&iteration, 1, MPI_INT, 0, world);
if ((rand() % 100 == 1) && rank != 0) {
printf("Rank %04d: committing suicide\n", rank);
raise(SIGKILL);
}
ret_code = MPI_Bcast(A[slider], N * N * N, MPI_DOUBLE, 0, world);;
was_error = ret_code != MPI_SUCCESS;
MPI_Allreduce(&was_error, &was_error, 1, MPI_INT, MPI_MAX, world);
if (was_error) {
printf("Bcast failed, restarting... %d\n", iteration);
MPIX_Comm_replace(world, &rworld);
MPI_Comm_free(&world);
world = rworld;
--iteration;
continue;
}
double eps = 0.;
//relax;
int k;
for (int iterat = line_start[rank]; iterat < line_start[rank] + line_count[rank]; iterat++) {
int i_cof = iterat / N;
int j_cof = iterat % N;
if (i_cof >= 1 && i_cof <= (N - 2) && j_cof >= 1 && j_cof <= (N - 2)) {
for (k = 1; k <= N - 2; k++) {
double e = A[slider][i_cof][j_cof][k];
A[slider ^ 1][i_cof][j_cof][k] = (A[slider][i_cof - 1][j_cof][k] + A[slider][i_cof + 1][j_cof][k]
+ A[slider][i_cof][j_cof - 1][k] + A[slider][i_cof][j_cof + 1][k]
+ A[slider][i_cof][j_cof][k - 1] +
A[slider][i_cof][j_cof][k + 1]) / 6.;
eps = Max(eps, fabs(e - A[slider ^ 1][i_cof][j_cof][k]));
}
}
}
slider ^= 1;
ret_code = MPI_Gatherv(A[slider][line_start[rank] / N][line_start[rank] % N], line_count[rank] * N, MPI_DOUBLE,
receive_buf[line_start[rank] / N][line_start[rank] % N], receive_counts, start_offset,
MPI_DOUBLE, 0, world);
slider ^= 1;
memcpy(A[slider], receive_buf, sizeof(double) * N * N * N);
was_error = ret_code != MPI_SUCCESS;
MPI_Allreduce(&was_error, &was_error, 1, MPI_INT, MPI_MAX, world);
if (was_error) {
printf("Gatherv failed, restarting... %d\n", iteration);
MPIX_Comm_replace(world, &rworld);
MPI_Comm_free(&world);
world = rworld;
--iteration;
continue;
}
if (rank == 0) { // just debug information
printf("it=%4i slider = %3d \n", iteration, slider);
verify();
}
}
MPI_Comm_free(&world);
MPI_Finalize();
return EXIT_SUCCESS;
}
void init() {
int i, j, k;
for (i = 0; i <= N - 1; i++) {
for (j = 0; j <= N - 1; j++) {
for (k = 0; k <= N - 1; k++) {
if (i == 0 || i == N - 1 || j == 0 || j == N - 1 || k == 0 || k == N - 1) {
A[slider][i][j][k] = 0.;
} else {
A[slider][i][j][k] = (4. + i + j + k);
}
}
}
}
}
void verify() {
double s = 0.;
int i, j, k;
for (i = 0; i <= N - 1; i++) {
for (j = 0; j <= N - 1; j++) {
for (k = 0; k <= N - 1; k++) {
s = s + A[slider][i][j][k] * (i + 1) * (j + 1) * (k + 1) / (N * N * N);
}
}
}
}